The focus of this notebook is on the development of a block-based method for computing distance matrices. The end goal is to produce a method compatible with Fast UniFrac that has reasonable runtime, and dramatically reduced space requirements relative to just computing beta diversity in a single shot. The intuition is that "blocks" of the resulting distance matrix can be computed independently, and that computing a "block" of samples (e.g., 32 at a time) has dramatically lower memory requirements than computing all of the samples at once (e.g., thousands). The reason for this is that Fast UniFrac produces an internal data structure called the counts_array which requires on the order of O(n * m * log(m)) space, where n is the number of samples and m is the number of OTUs. By only computing a subset of samples at a time, both n and m are reduced with the former being a reducing simply by the number of samples being evaluated and the latter on the general case where the examination of fewer samples necessitates representing fewer OTUs in the tree.

There still exist optimizations to be performed. The block method is currently setup so that it can be parallelized (as each block is independent), but at the moment, the computation is performed serially. In addition, the block method results in duplicated compute with some positions in the distance matrix being computed ceil(n / k) times where n is the number of samples and k is the block size. However, the compute of an individual pairwise calculation is small relative to the expense of creating a counts_array. Note, this suggests using a large k but it has been observed that a large k can drive runtime up possibly due to needing a larger number of OTUs represented within a block, and thus increasing the expense in space and time of the construction of the counts_array.

During development, it became apparent that the TreeNode is very large to represent in memory, and Fast UniFrac doesn't actually need to operate on it directly. One of the bottlenecks identified was the TreeNode.shear method which is used to take a subset of the tree based on the OTUs represented by the samples within a block. More discussion on this new shear method can be found here.


In [31]:
from time import time
from random import shuffle
from collections import Counter
import os

import numpy as np
import psutil  # pip install psutil

from skbio import read, TreeNode, DistanceMatrix
from skbio.diversity import beta_diversity
from memory_profiler import memory_usage  # pip install memory_profiler
from biom import load_table

process = psutil.Process(os.getpid())

In [66]:
from ipyparallel import Client
import sys
from functools import partial
client = Client()
dv = client[:]
bv = client.load_balanced_view()

with dv.sync_imports():
    import numpy as np
    from skbio.diversity import beta_diversity

class MockTreeNode(object):
    def __init__(self, original_tree_array):
        self.original_tree_array = original_tree_array

    def to_array(self, nan_length_value=0.0):
        import numpy as np
        self.original_tree_array['length'][np.isnan(self.original_tree_array['length'])] = nan_length_value
        return self.original_tree_array

    def shear(self, to_keep):
        return MockTreeNode(shear(self.original_tree_array, to_keep))
dv['MockTreeNode'] = MockTreeNode


def shear(indexed, to_keep):
    """Shear off nodes from a tree array
    
    Parameters
    ----------
    indexed : dict
        The result of TreeNode.to_array
    to_keep : set
        The tip IDs of the tree to keep
        
    Returns
    -------
    dict
        A TreeNode.to_array like dict with the exception that "id_index" is not
        provided, and any extraneous attributes formerly included are not 
        passed on.
    
    Notes
    -----
    Unlike TreeNode.shear, this method does not prune (i.e., collapse single
    descendent nodes). This is an open development target.
    
    This method assumes that to_keep is a subset of names in the tree.
    
    The order of the nodes remains unchanged.
    """
    import numpy as np
    
    # nodes to keep mask
    mask = np.zeros(len(indexed['id']), dtype=np.bool)

    # set any tips marked "to_keep"
    tips_to_keep = [i for i, n in enumerate(indexed['name']) if n in to_keep]
    mask[np.asarray(tips_to_keep)] = True

    # perform a post-order traversal and identify any nodes that should be 
    # retained
    new_child_index = []
    for node_idx, child_left, child_right in indexed['child_index']:
        being_kept = mask[child_left:child_right + 1]

        # NOTE: the second clause is an explicit test to keep the root node. This 
        # may not be necessary and may be a remenant of mucking around.
        if being_kept.sum() >= 1 or node_idx == indexed['id'][-1]:
            mask[node_idx] = True

    # we now know what nodes to keep, so we can create new IDs for assignment
    new_ids = np.arange(mask.sum(), dtype=int)
    
    # construct a map that associates old node IDs to the new IDs
    id_map = {i_old: i_new for i_old, i_new in zip(indexed['id'][mask], new_ids)}

    # perform another post-order traversal to construct the new child index arrays
    # which provide index positions of the desecendents of a given internal node.
    for node_idx, child_left, child_right in indexed['child_index']:
        being_kept = mask[child_left:child_right + 1]

        # NOTE: the second clause is an explicit test to keep the root node. This 
        # may not be necessary and may be a remenant of mucking around.
        if being_kept.sum() >= 1 or node_idx == indexed['id'][-1]:
            new_id = id_map[node_idx]
            child_indices = indexed['id'][child_left:child_right + 1][being_kept]
            left_child = id_map[child_indices[0]]
            right_child = id_map[child_indices[-1]]
            new_child_index.append([new_id, left_child, right_child])

    new_child_index = np.asarray(new_child_index)

    return {'child_index': new_child_index,
            'length': indexed['length'][mask],
            'name': indexed['name'][mask],
            'id': new_ids}

dv['shear'] = shear

def _block_execute(metric, tree, table, ids):
    row_ids, col_ids = ids
    ids_to_keep = row_ids.union(col_ids)

    block = table.filter(ids_to_keep, inplace=False)
    block.filter(lambda v, i, md: v.sum() > 0, axis='observation')
    block_ids = block.ids()
    block_otu_ids = block.ids(axis='observation')
    block_tree = tree.shear(block_otu_ids)
    block_matrix = block.matrix_data.astype(int).T.toarray()

    block_dmat = beta_diversity(metric, block_matrix, block_ids,
                                tree=block_tree, otu_ids=block_otu_ids,
                                validate=False)
    
    return (row_ids, col_ids, block_dmat)

def _blockinator(ids, block_size):
    """Get blocks of IDs"""
    for row_start in range(0, len(ids), block_size):
        for col_start in range(row_start, len(ids), block_size):
            row_ids = set(ids[row_start:row_start + block_size])
            col_ids = set(ids[col_start:col_start + block_size])
            yield (row_ids, col_ids)

            
def block_dist(tree, table, metric, block_size=64):
    """Perform a block-based computation of a distance matrix
    
    Parameters
    ----------
    tree : TreeNode-like object
        A Tree
    table : biom
        A biom table of the samples and observations
    metric : str, one of {unweighted_unifrac, weighted_unifrac}
        The method to use
    block_size : int
        The size of the block in the resulting distance matrix to 
        compute at a time.
        
    Returns
    -------
    DistanceMatrix
        The computed distance matrix
    """
    global dv
    global bv
    
    table._sample_metadata = None
    table._observation_metadata = None
    
    _block_executer = partial(_block_execute, 'unweighted_unifrac', tree, table)

    dv['_block_execute'] = _block_execute
    dv['_block_executer'] = _block_executer
    
    ids = table.ids()
    dmat = np.zeros((len(ids), len(ids)), dtype=float)
    dmat_index = {i: idx for i, idx in zip(ids, range(len(ids)))}    
    
    reduce_total = 0
    for row_ids, col_ids, block_dmat in bv.map(_block_executer, list(_blockinator(ids, block_size)), chunksize=1):
        reduce_time = time()
        for i in block_dmat.ids:
            i_idx = block_dmat.index(i)
            i_dmat_index = dmat_index[i]
            
            for j in block_dmat.ids[i_idx:]:
                j_idx = block_dmat.index(j)
                dmat[i_dmat_index, dmat_index[j]] = block_dmat.data[i_idx, j_idx]
        reduce_total += (time() - reduce_time)
    print(reduce_total)
    return DistanceMatrix(dmat + dmat.T, ids=ids)


importing numpy on engine(s)
importing beta_diversity from skbio.diversity on engine(s)

In [62]:
def bench(tree, table, number_otus, number_samples, block_size, metric):
    """Benchmark the block and regular beta diversity methods
    
    Parameters
    ----------
    tree : path
        File path to the tree to load
    table : path
        File path to the table to load
    number_otus : int
        Number of OTUs to use in the test
    number_samples : int
        Number of samples to use in the test
    block_size : int
        The blocksize to use for the test
    metric : str, {unweighted_unifrac, weighted_unifrac}
    """
    # the time to read the tree and BIOM table
    start = time()
    tree = read(tree, into=TreeNode)
    table = load_table(table)

    for node in tree.traverse(include_self=False):
        if node.length is None:
            node.length = 0.0
    
    # aggressively clean up leaky variables so the original tree can be freed
    del node
    
    # get sample subset
    sample_ids = table.ids()

    # samples must have at least 1000 sequences
    sample_ids = [i for i, v in zip(sample_ids, table.sum(axis='sample')) if v >= 1000]
    shuffle(sample_ids)
    sample_ids = sample_ids[:number_samples]
    table.filter(sample_ids)
    
    # observations must exist in at least .1% of samples
    table.filter(lambda v, i, md: ((v != 0).sum() / len(sample_ids)) >= 0.001, axis='observation')

    # get otu subset of the tree
    table_obs_ids = table.ids(axis='observation')
    table_obs_idx_lookup = {i: idx for idx, i in enumerate(table_obs_ids)}
    otu_ids = [n.name for n in tree.tips()]
    otu_ids = list(set(otu_ids).intersection(set(table_obs_ids)))  # make sure OTUs overlap with table
    shuffle(otu_ids)
    otu_ids = otu_ids[:number_otus]
    
    # construct a MockTreeNode using skbio's TreeNode.shear method
    tree_array = MockTreeNode(tree.shear(otu_ids).to_array())
    
    # delete unnecessary references to the tree, drop the tree and request a cleanup
    del tree_array.original_tree_array['id_index']
    del tree
    import gc; gc.collect()
    
    # remove excess OTUs from the table
    table.filter(otu_ids, axis='observation')

    # remove samples without any OTUs (hopefully a small number...)
    table.filter(lambda v, i, md: v.sum() > 0)

    print("# spinuptime: %f" % (time() - start))
    print("# spinupmem: %f" % (process.memory_info().rss / 2**20))

    # if we dropped more than 10% of desired samples do to filtering about, let's bail
    if ((number_samples - len(table.ids())) / float(number_samples)) > 0.1:
        print(number_samples)
        print(len(table.ids()))
        raise ValueError("Dropped too many samples!")

    # run the block method
    args = (tree_array, table, metric, block_size)
    block_start = time()
    (block_usage, block_result) = memory_usage((block_dist, args),
                                               interval=2, max_usage=True,
                                               retval=True)
    block_time = time() - block_start

    print("#number_otus\tnumber_samples\tblocksize\truntime\tpeakmem\tmethod\n")
    print('\t'.join([str(i) for i in [number_otus, number_samples,
                                      block_size, block_time,
                                      block_usage[-1], 'block']]))
    
    # run the normal method
    test_matrix = table.matrix_data.astype(int).T.toarray()
    args = (metric, test_matrix, table.ids())
    kwargs = {'tree': tree_array, 'otu_ids':table.ids(axis='observation'),
                  'validate': False}
    normal_start = time()
    (usage, result) = memory_usage((beta_diversity, args, kwargs),
                                       interval=2, max_usage=True, retval=True)
    normal_time = time() - normal_start

    print('\t'.join([str(i) for i in [number_otus, number_samples,
                                      'NA', normal_time, usage[-1],
                                      'regular']]))

    if not np.allclose(block_result.data, result.data):
        print(block_result.ids == result.ids)
        print(block_result.data[:5, :5])
        print(result.data[:5, :5])
        raise ValueError

In [67]:
gg_tree = '/Users/daniel/miniconda3/envs/qiime191/lib/python2.7/site-packages/qiime_default_reference/gg_13_8_otus/trees/97_otus.tree'
ag_table = '/Users/daniel/rs/American-Gut/data/AG/AG_even1k.biom'
number_otus = 10000
number_samples = 1000
block_size = 256
metric = 'unweighted_unifrac'

bench(gg_tree, ag_table, number_otus, number_samples, block_size, metric)


# spinuptime: 24.513645
# spinupmem: 506.218750
0.8868274688720703
#number_otus	number_samples	blocksize	runtime	peakmem	method

10000	1000	256	54.04333019256592	506.3984375	block
10000	1000	NA	64.44073700904846	938.58984375	regular

In [51]:
# 10000 otus, 1000 samples, 128 blocksize on the AG data
# 118 v 65 on 1 core
# 60 v 65 on 2 core
# 39 v 65 on 3 core
# 35 v 65 on 4 core

# each thread peaks around 150MB

In [ ]:
# 10000 otus, 1000 samples, 64 blocksize on the AG data
# 64 v 65 on 3 core
# 45 v 62 on 4 core

# each thread peaks around 125MB

In [ ]:
# 10000 otus, 1000 samples, 256 blocksize on the AG data
# 30 v 62 on 4 core
# 54 v 63 on 2 core


# 10000 otus, 1000 samples, 512 blocksize on the AG data
# 62 v 62 on 2 core
# 62 v 62 on 4 core